package org.example.launch

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.spark.sql.EsSparkSQL
import org.example.client.DbClient
import org.example.common.{Logging, Sparking}
import org.example.constant.ApolloConst
import scalikejdbc.{NamedDB, SQL}

/**
 * 驾驶和车辆关联表
 */
object vehicleDriverArchiveRel extends Sparking with Logging {
def getvehicleDriverArchiveRel(sparkSession: SparkSession):DataFrame=
  {
    var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
    //val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
    val queryBuilder = QueryBuilders.matchAllQuery() //不限制条件查询
    builder.query(queryBuilder) //查询规则加到查询对象中
    val esQuery = builder.toString //将查询对象转为string格式
    val frame_Risk: DataFrame = EsSparkSQL
      .esDF(sparkSession, "risk_detail_index/_doc", esQuery)
      .select("vehicleNo", "vehicleColor", "cardDriverLicence", "driverName","processDate")
      .toDF("vehicleNo", "vehicleColor", "cardDriverLicence", "driverName","processDate")
    frame_Risk.show(3, false)
    frame_Risk.createOrReplaceTempView("risk_detail_index")
    val sql=
      s"""
        |SELECT cast(vehicleNo as string) as vehicle_no,
        |cast(vehicleColor as string) as vehicle_color,
        |cast(cardDriverLicence as string) as driver_license_number,
        |cast(cardDriverLicence as string) as driver_certificate_number,
        |cast(driverName as string) driver_name,
        |cast('2' as string) as data_source,
        |cast(DATE_FORMAT(current_date(),'yyyy-MM-dd') as string) as  gmt_modified,
        |cast(count(*) as string) as  association_num
        |from risk_detail_index
        |where processDate is not null
        |group by vehicleNo ,vehicleColor  ,cardDriverLicence ,driverName
        |""".stripMargin
    val dataFrame: DataFrame = sparkSession.sql(sql)
    dataFrame.show(10, false)
    dataFrame
  }

  def main(args: Array[String]): Unit = {
    //1、获取ES连接
    val sparkSession = SparkSession.builder().config(conf)
      .enableHiveSupport()
      //      .master("local[*]")
      //      .appName("datawarehouse")
      .config("hive.exec.dynamic.partition", true) // 支持 Hive 动态分区
      .config("hive.exec.dynamic.partition.mode", "nonstrict") // 非严格模式
      .config("hive.metastore.uris", ApolloConst.hiveMetastore) //"thrift://10.22.17.26:9083"
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.sql.crossJoin.enabled", "true")
      .config("spark.sql.caseSensitive", "true")
      .config("es.nodes", ApolloConst.esNodes)
      .config("es.port", ApolloConst.esPort)
      .config("es.read.field.as.array.include", "true")
      .config("es.read.field.as.array.include", "warnList,processCommentList")
      .config("es.mapping.date.rich", "false")
      .getOrCreate()
    val sql_Res = getvehicleDriverArchiveRel(sparkSession)
    sql_Res.foreach { row =>
      val poolName = "vehicle_driver_archive_rel"
      DbClient.init(poolName,
        ApolloConst.jgdMysqlDriver, ApolloConst.jgdMysqlURL,
        ApolloConst.jgdMysqlUserName,
        ApolloConst.jgdMysqlPassWord)
      DbClient.usingDB(poolName) { db: NamedDB =>
        val sqlStr =
          s"""
             |replace INTO zcov.vehicle_driver_archive_rel (vehicle_no, vehicle_color, driver_license_number, driver_certificate_number, driver_name, data_source, association_num, gmt_modified) VALUES(?,?,?,?,?,?,?,?)
             |""".stripMargin
        db autoCommit { implicit session =>
          SQL(sqlStr).bind(row.getString(0), row.getString(1), row.getString(2), row.getString(3), row.getString(4), row.getString(5), row.getString(6), row.getString(7)).update().apply()
        }
      }
    }
    sparkSession.close()
  }
}
